import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Author :  Rocky
 * Date : 24/05/2017 23:08
 * Description :
 * Test :
 */
public class MultiThreadMultiFiles_Produce_Consumer_Test2 {

    private static CountDownLatch wroteByteCount = new CountDownLatch(2000 * 1024 * 1024);

    public static void main(String[] args) throws InterruptedException, IOException {
        long s = System.currentTimeMillis();


        int writeRequestQueueSize = 5000000;
        WriteRequestQueue writeRequestQueue = new WriteRequestQueue(writeRequestQueueSize);


        //启动写进程
        int writerCount = 5;
        for (int i = 0; i < writerCount; i++) {
            Thread writer = new Thread(new WriterRunnable(writeRequestQueue));
            writer.start();
        }
        System.out.println(writerCount + " 个写进程启动完毕");


        //创建5个线程分别往5个文件中发送数据，每个线程发送400m数据
        int fileCount = 5;
        int bytesPerConsumer = 400 * 1024 * 1024;
        Map<String, MappedByteBufferWrapper> fileMaps = new HashMap<>(5);
        for (int i = 0; i < fileCount; i++) {
            String fileName = "testfile" + i;
            File file = new File(fileName);
            if (file.exists()) {
                file.delete();
            }
            RandomAccessFile raf = new RandomAccessFile(file, "rw");
            MappedByteBuffer mbb = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, bytesPerConsumer);
            raf.close();

            MappedByteBufferWrapper mbbWrapper = new MappedByteBufferWrapper(mbb);
            Thread writer = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i1 = 0; i1 < bytesPerConsumer; i1++) {
                        WriteRequest request = new WriteRequest((byte) 4, mbbWrapper);
                        writeRequestQueue.put(request);
                        wroteByteCount.countDown();
                    }
                }
            });
            writer.start();
        }
        System.out.println(fileCount + " 发送数据的进程启动完毕");

        wroteByteCount.await();

        long time = (System.currentTimeMillis() - s);
        long totalBytes = 20000; //单位m
        System.out.println("5个线程分别往5个文件中发送数据，每个线程发送400m数据，通过queue转交给另外5个线程执行最终的写操作， 总耗时 " + time + " ms ，速度 " + (totalBytes * 1024 / (time * 1.0 / 1000)) + " kb/s");
        System.out.println("整个过程请求队列 空 " + emptyCount.get() + " 次，满 " + fullCount.get() + " 次");
    }


    //
    static class WriteRequest {

        private byte data;

        private MappedByteBufferWrapper mbbWrapper;

        public WriteRequest(byte data, MappedByteBufferWrapper mbbWrapper) {
            this.data = data;
            this.mbbWrapper = mbbWrapper;
        }
    }


    //封装 MappedByteBuffer 和 正在使用它的线程
    static class MappedByteBufferWrapper {

        private MappedByteBuffer mbb;

        public MappedByteBufferWrapper(MappedByteBuffer mbb) {
            this.mbb = mbb;
        }
    }

    private static AtomicInteger fullCount = new AtomicInteger(0);
    private static AtomicInteger emptyCount = new AtomicInteger(0);

    static class WriteRequestQueue {

        private LinkedBlockingQueue<WriteRequest> queue = new LinkedBlockingQueue<>();

        private int queueSize;

        public WriteRequestQueue(int queueSize) {
            this.queueSize = queueSize;
            this.queue = new LinkedBlockingQueue<>(this.queueSize);
        }

        private void put(WriteRequest request) {
            if (queue.size() == queueSize) {
                fullCount.incrementAndGet();
            }

            boolean offerSuccess = false;
            while (!offerSuccess) {
                try {
                    queue.put(request);
                    offerSuccess = true;
                } catch (InterruptedException e) {
                    System.out.println("不严重异常 " + e.getMessage());
                }
            }

        }

        private WriteRequest take() {

            if (queue.size() == 0) {
                emptyCount.incrementAndGet();
            }

            WriteRequest writeRequest = null;

            //如果10后仍然没有获取数据，则 take
            while (writeRequest == null) {
                try {
                    writeRequest = queue.take();
                } catch (InterruptedException e) {
                    System.out.println("不严重异常 " + e.getMessage());
                }
            }

            return writeRequest;
        }
    }


    //定义写进程要完成的工作
    static class WriterRunnable implements Runnable {

        private WriteRequestQueue writeRequestQueue;

        public WriterRunnable(WriteRequestQueue writeRequestQueue) {
            this.writeRequestQueue = writeRequestQueue;
        }


        @Override
        public void run() {
            while (true) {
                try {
                    WriteRequest writeRequest = writeRequestQueue.take();

                    //此时 writeRequest != null
                    MappedByteBufferWrapper mbbWrapper = writeRequest.mbbWrapper;

                    synchronized (mbbWrapper.mbb) {
                        //写数据
                        if (mbbWrapper.mbb.hasRemaining()) {
                            mbbWrapper.mbb.put(writeRequest.data);
                        } else {
                            System.out.println("文件写满了，没空间了");
                        }
                    }
                } catch (Throwable e) {
                    System.out.println("写数据异常，将丢弃该信息 " + e.getMessage());
                }
            }
        }
    }
}
